Przed oddaniem zadania upewnij się, że wszystko działa poprawnie. Uruchom ponownie kernel (z paska menu: Kernel$\rightarrow$Restart) a następnie wykonaj wszystkie komórki (z paska menu: Cell$\rightarrow$Run All).
Upewnij się, że wypełniłeś wszystkie pola TU WPISZ KOD lub TU WPISZ ODPOWIEDŹ, oraz
że podałeś swoje imię i nazwisko poniżej:
NAME = "Michal Marciniak"
Autokoder (ang. autoencoder) to model trenowany w zadaniu rekonstruowania wejścia. Zazwyczaj składa się z dwóch sieci neuronowych:
Innymi słowy, autokoder jest modelem mapującym wejście $\mathbf{x}$ na wyjście $\mathbf{r}$ poprzez jego wewnętrzną, ukrytą reprezentację $\mathbf{h}$.
Głównym celem trenowania modelu jest znalezienie najlepszej pary koder-dekoder, która zachowuje maksimum informacji podczas kodowania, co daje najmniejszy błąd rekonstrukcji $\mathcal{L}$:
$$(f^*, g^*) = \arg\min\mathcal{L}(\mathbf{x}, g(f(\mathbf{x}))).$$W przypadku autokoderów często używaną funkcją kosztu jest błąd średniokwadratowy.
Klasyczne autokodery używane są w zwykle w celu redukcji wymiarowości lub wstępnego uczenia cech do modelu. Ze względu na brak wykorzystania etykiet, modele te są trenowane w sposób nienadzorowany.
Wykorzystanie wyłącznie błędu rekonstrukcji jako funkcji celu w klasycznych autokoderach wymusza na modelu uczenie się skompresowanej reprezentacji danych, jednak często może prowadzić do jego przetrenowania, przez co jego zdolności generatywne są ograniczone (głównie ze względu na nieregularną przestrzeń ukrytą).
Rozwiązaniem pozwalającym na wyuczenie się reprezentacji o wyższej jakości są autokodery wariacyjne (ang. variational autoencoders). Jest to model generatywny, gdzie zamiast uczenia funkcji kodera mapującej wejście do przestrzeni ukrytej będziemy próbowali uczyć się nieznanego rozkładu danych $p_{\theta^*}(\mathbf{z})$.
Skupmy się na modelu generatywnym $p_\theta(\mathbf{z})p_\theta(\mathbf{x} | \mathbf{z})$, oznaczonym liniami ciągłymi. Zakładamy, że przykłady ze zbioru danych $\mathbf{X} = \left\{x^{(i)}\right\}_{i=1}^N$, składającego się z $N$ niezależnych i pochodzących z tego samego rozkładu przykładów, generowane są przez proces losowy, w którym występuje nieobserwowana, ciągła zmienna losowa $\mathbf{z}$. Proces ten składa się z dwóch kroków:
Zakładamy tutaj, że rozkłady $p_{\theta^*}(\mathbf{z})$ oraz $p_{\theta^*}(\mathbf{x} | \mathbf{z})$ należą do rodzin rozkładów $p_{\theta}(\mathbf{z})$ oraz $p_{\theta}(\mathbf{x} | \mathbf{z})$, parametryzowanych przez $\theta$; zakładamy że ich funkcje gęstości są różniczkowalne względem $\theta$ i $\mathbf{z}$. Zależność $\mathbf{x}^{(i)}$ od $\mathbf{z}^{(i)}$ będziemy modelować przy użyciu sieci neuronowej o parametrach $\theta$.
Parametry te moglibyśmy znaleźć maksymalizując likelihood:
$$p(\mathbf{x}) = \int p_\theta(\mathbf{x} | \mathbf{z})p(\mathbf{z})d\mathbf{z}.$$Nie jest to jednak możliwe, ze względu na całkowanie po wszystkich wartościach priora. Wprowadzimy zatem model probabilistycznego kodera $q_\phi(\mathbf{z} | \mathbf{x})$ (oznaczony linią przerywaną) - aproksymację prawdziwego posteriora $p_\theta(\mathbf{z} | \mathbf{x})$ - rozkład wariacyjny, najczęściej normalny. Będziemy go modelować przy użyciu sieci neuronowej o parametrach $\theta$. W tym kontekście, model generatywny $p_{\theta^*}(\mathbf{x} | \mathbf{z})$ możemy traktować jako probabilistyczny dekoder.
Możemy zatem sformułować naszą funkcję celu w następujący sposób:
$$ \begin{align} \log p_\theta\left(\mathbf{x}\right) & = \mathbb{E}_{\mathbf{z} \sim q_\phi\left(\mathbf{z} | \mathbf{x}\right)} \left[\log p_\theta\left(\mathbf{x}\right)\right] & \text{$p_\theta\left(\mathbf{x}\right)$ jest niezależne od $\mathbf{z}$}\\ & = \mathbb{E}_{\mathbf{z}}\left[\log\frac{p_\theta\left(\mathbf{x} | \mathbf{z}\right)p\left(\mathbf{z}\right)}{p\left(\mathbf{z} | \mathbf{x}\right)}\right] & \text{Reguła Bayesa}\\ & = \mathbb{E}_{\mathbf{z}}\left[\log\frac{p_\theta\left(\mathbf{x} | \mathbf{z}\right)p\left(\mathbf{z}\right)}{p\left(\mathbf{z} | \mathbf{x}\right)}\frac{q_\phi\left(\mathbf{z} | \mathbf{x}\right)}{q_\phi\left(\mathbf{z} | \mathbf{x}\right)}\right] & \text{pomnożyć przez 1}\\ & = \mathbb{E}_{\mathbf{z}}\left[\log p_\theta\left(\mathbf{x} | \mathbf{z}\right)\right] - \mathbb{E}_{\mathbf{z}}\left[\log \frac{q_\phi\left(\mathbf{z} | \mathbf{x}\right)}{p\left(\mathbf{z}\right)}\right] + \mathbb{E}_{\mathbf{z}}\left[\log \frac{q_\phi\left(\mathbf{z} | \mathbf{x}\right)}{p\left(\mathbf{z} | \mathbf{x}\right)}\right] & \text{logarytm}\\ & = \underbrace{\mathbb{E}_{\mathbf{z}}\left[\log p_\theta\left(\mathbf{x} | \mathbf{z}\right)\right] - D_{KL}\left(q_\phi\left(\mathbf{z} | \mathbf{x}\right)\| p\left(\mathbf{z}\right)\right)}_{\mathcal{L}\left(\mathbf{x}, \theta, \phi\right)} + \underbrace{D_{KL}\left(q_\phi\left(\mathbf{z} | \mathbf{x}\right)\| p\left(\mathbf{z} | \mathbf{x}\right)\right)}_{\ge 0}\\ \end{align} $$Dywergencji Kulbacka-Leiblera między prawdziwym posteriorem i jego aproksymacją nie możemy aproksymować wprost, wiemy jednak że jest zawsze większa lub równa zero. Z tą wiedzą możemy przekształcić to równanie do postaci nierówności i uzyskać funkcję kosztu ELBO:
$$\\log p_\theta \left(\mathbf{x}\right) \ge \underbrace{\mathbb{E}_{\mathbf{z}}\left[\log p_\theta\left(\mathbf{x} | \mathbf{z}\right)\right]}_{\text{błąd rekonstrukcji}} - \underbrace{D_{KL}\left(q_\phi\left(\mathbf{z} | \mathbf{x}\right)\| p\left(\mathbf{z}\right)\right)}_{\text{regularyzacja aproksymacji posteriora}} = \mathcal{L}\left(\mathbf{x}, \theta, \phi\right).$$Musimy zatem jeszcze przyjąć rozkład prior - najczęściej przyjmuje się rozkład standardowy $\mathcal{N}(0, 1)$.
Model ten uczony będzie metodą Maximum Likelihood Estimation:
$$\theta^*, \phi^* = \underset{\theta, \phi}{\arg\max} \sum_{i=1}^N \mathcal{L}\left(x_i, \theta, \phi\right).$$Zaczniemy od implementacji klasycznego autokodera z wykorzystaniem biblioteki PyTorch. Będziemy się opierać na klasie bazowej BaseAutoencoder, której metody należy zaimplementować aby możliwe było wykorzystanie przygotowanych analiz.
from typing import Tuple
import matplotlib.pyplot as plt
import numpy as np
import pyro
import pyro.distributions as dist
import torch
import torch.nn as nn
from IPython.display import Code, display
from torch.utils.data import DataLoader
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor
from tqdm.notebook import tqdm
from src.ae import BaseAutoEncoder
from src.utils import train_ae, AutoEncoderAnalyzer
np.random.seed(2021)
torch.manual_seed(2021)
display(Code(filename="src/ae.py"))
Tworzymy model kodera i dekodera: oba zawierajÄ… po jednej warstwie ukrytej oraz odpowiednie funkcje aktywacji.
class Encoder(nn.Module):
"""Encoder module; function h."""
def __init__(
self,
n_input_features: int,
n_hidden_neurons: int,
n_latent_features: int,
):
"""
:param n_input_features: number of input features (28 x 28 = 784 for MNIST)
:param n_hidden_neurons: number of neurons in hidden FC layer
:param n_latent_features: size of the latent vector
"""
super().__init__()
self.input_to_hidden = nn.Linear(n_input_features, n_hidden_neurons)
self.hidden_to_latent = nn.Linear(n_hidden_neurons, n_latent_features)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Encoder forward function."""
h = self.input_to_hidden(x)
h = nn.functional.relu(h)
h = self.hidden_to_latent(h)
return h
class Decoder(nn.Module):
"""Decoder module; function g."""
def __init__(
self,
n_latent_features: int,
n_hidden_neurons: int,
n_output_features: int,
):
"""
:param n_latent_features: number of latent features (same as in Encoder)
:param n_hidden_neurons: number of neurons in hidden FC layer
:param n_output_features: size of the output vector (28 x 28 = 784 for MNIST)
"""
super().__init__()
self.latent_to_hidden = nn.Linear(n_latent_features, n_hidden_neurons)
self.hidden_to_output = nn.Linear(n_hidden_neurons, n_output_features)
def forward(self, h: torch.Tensor) -> torch.Tensor:
"""Decoder forward function."""
r = self.latent_to_hidden(h)
r = nn.functional.relu(r)
r = self.hidden_to_output(r)
r = torch.sigmoid(r)
return r
Modele te wykorzystujemy do zaimplementowania Autokodera; implementujemy metody encoder_forward oraz decoder_forward, które służą do tworzenia ukrytej reprezentacji oraz rekonstruowania na jej podstawie obrazu wejściowego.
class Autoencoder(BaseAutoEncoder):
"""Auto encoder module."""
def __init__(
self,
n_data_features: int,
n_encoder_hidden_features: int,
n_decoder_hidden_features: int,
n_latent_features: int,
):
"""
:param n_data_features: number of input and output features (28 x 28 = 784 for MNIST)
:param n_encoder_hidden_features: number of neurons in encoder's hidden layer
:param n_decoder_hidden_features: number of neurons in decoder's hidden layer
:param n_latent_features: number of latent features
"""
encoder = Encoder(
n_input_features=n_data_features,
n_hidden_neurons=n_encoder_hidden_features,
n_latent_features=n_latent_features,
)
decoder = Decoder(
n_latent_features=n_latent_features,
n_hidden_neurons=n_decoder_hidden_features,
n_output_features=n_data_features,
)
super().__init__(
encoder=encoder, decoder=decoder, n_latent_features=n_latent_features
)
self.input_shape = None
def encoder_forward(self, x: torch.Tensor) -> torch.Tensor:
"""Function to perform forward pass through encoder network."""
if self.input_shape is None:
self.input_shape = x.shape[1:]
x = x.view(x.shape[0], -1)
return self.encoder(x)
def decoder_forward(self, x: torch.Tensor) -> torch.Tensor:
"""Function to perform forward pass through decoder network."""
return self.decoder(x).view(-1, *self.input_shape)
W zadaniu ponownie wykorzystamy zbiór MNIST, zawierający odręcznie zapisane cyfry w formie obrazów o rozdzielczości $28\times28$, z wartościami w przedziale $[0, 255]$ (funkcja ToTensor() przetransformuje je do zakresu $[0, 1]$). Zbiór treningowy ograniczamy do 10000 przykładów.
train_dataset = MNIST(root="data", download=True, train=True, transform=ToTensor())
val_dataset = MNIST(root="data", download=True, train=False, transform=ToTensor())
# limiting the dataset
indices = np.random.permutation(len(train_dataset.data))[:10_000]
train_dataset.data = train_dataset.data[indices]
train_dataset.targets = train_dataset.targets[indices]
Definiujemy model wraz z arbitralnie dobranymi hiperparametrami oraz funkcję kosztu (MSE). Następnie wywołujemy przygotowaną pętlę uczenia w celu wytrenowania modelu. Parametry dobrane tutaj powinny spowodować przetrenowanie autokodera.
batch_size = 32
lr = 1e-2
epochs = 20
ae_model = Autoencoder(
n_data_features=28 * 28, # MNIST pixels
n_encoder_hidden_features=128, # chosen arbitrarily
n_decoder_hidden_features=128, # chosen arbitrarily
n_latent_features=10, # how many features will be used to represent input
)
train_dataloader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
drop_last=True,
)
val_loader = DataLoader(
val_dataset, batch_size=batch_size, shuffle=False, drop_last=False
)
loss_fn = nn.MSELoss()
train_ae(
ae_model,
epochs=epochs,
train_loader=train_dataloader,
val_loader=val_loader,
lr=lr,
loss_fn=loss_fn,
)
Poniżej znajdują się wywołania analiz działania modelu: porównanie rekonstrukcji z obrazami wejściowymi, uśrednione punkty reprezentujące każdą klasę, badanie zdolności generatywnych modelu przez modyfikowanie wektora ukrytego oraz wizualizację przestrzeni ukrytej.
analyzer = AutoEncoderAnalyzer(model=ae_model, dataset=val_dataset, n_samplings=1)
analyzer.compare_reconstruction_with_original()
plt.show()
analyzer.average_points_per_class()
plt.show()
for digit, latent_code in enumerate(analyzer._averages):
print(f"Digit: {digit}")
analyzer.analyze_features(latent_code, steps=11)
plt.show()
analyzer.analyze_tsne() # this may take quite a long time
plt.show()
Do wytrenowania modelu VAE wykorzystamy bibliotekę Pyro. Funkcja ta dostarcza gotową implementację funkcji kosztu ELBO oraz metody SVI wykorzystywanej do trenowania modelu. Detale implementacyjne znajdują się w przygotowanej funkcji train_ae, którą wykorzystywaliśmy też wcześniej do trenowania modelu autokodera. Pyro wymaga od nas przygotowania funkcji model oraz guide. Pierwszy z nich ma definiować model generatywny $p_\theta(\mathbf{x} | \mathbf{z})p_\theta(\mathbf{z})$, drugi natomiast odpowiada za definicję aproksymacji posteriora $q_\phi(\mathbf{z} | \mathbf{x})$. Z nimi możemy wykorzystać klasę Trace_ELBO, która posłuży jako funkcja kosztu. Zachęcamy do zapoznania się z dokumentacją, gdzie znajduje się więcej informacji na temat modelu oraz guide'a w bibliotece Pyro.
WzorujÄ…c siÄ™ na implementacji klasycznego autokodera, zaimplementuj model autokodera wariacyjnego:
VariationalAutoencoder:encoder_forward, która dla danego wejścia wygeneruje jego ukrytą reprezentację. Wykorzystaj parametry rozkładu generowane przez koder oraz wykonaj w tej funkcji próbkowanie.decoder_forward, która dla danej ukrytej reprezentacji (wypróbkowanej z rozkładu) wygeneruje jego rekonstrukcję.Zwróć uwagę, że w analizach doszło porównanie kilku próbek z rozkładu ukrytego modelu.
Zbadaj wpływ hiperparametrów modelu wariacyjnego autokodera (akie jak liczba neuronów w warstwach ukrytych, rozmiar ukrytej reprezentacji, współczynnik uczenia, itp.) na proces jego trenowania (szybkość, zdolność do wytrenowania, szybkość zbiegania itd.), uzyskiwane rezultaty oraz zdolności generatywne i właściwości przestrzeni ukrytej. Wykorzystaj przygotowaną klasę AutoEncoderAnalyzer. Zapisz wnioski w komórce Markdown.
class VEncoder(nn.Module):
"""Encoder for VAE."""
def __init__(
self,
n_input_features: int,
n_hidden_neurons: int,
n_latent_features: int,
):
"""
:param n_input_features: number of input features (28 x 28 = 784 for MNIST)
:param n_hidden_neurons: number of neurons in hidden FC layer
:param n_latent_features: size of the latent vector
"""
super().__init__()
# TU WPISZ KOD
self.input_to_hidden = nn.Linear(n_input_features, n_hidden_neurons)
self.locs = nn.Linear(n_hidden_neurons, n_latent_features)
self.scales = nn.Linear(n_hidden_neurons, n_latent_features )
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""Encode data to gaussian distribution params."""
z_loc = None
z_scale = None
# TU WPISZ KOD
h = self.input_to_hidden(x)
h = nn.functional.relu(h)
z_loc = self.locs(h)
z_scale = self.scales(h)
z_scale = nn.functional.softplus(z_scale)
return z_loc, z_scale
class VDecoder(nn.Module):
"""Decoder for VAE."""
def __init__(
self,
n_latent_features: int,
n_hidden_neurons: int,
n_output_features: int,
):
"""
:param n_latent_features: number of latent features (same as in Encoder)
:param n_hidden_neurons: number of neurons in hidden FC layer
:param n_output_features: size of the output vector (28 x 28 = 784 for MNIST)
"""
super().__init__()
# TU WPISZ KOD
self.latent_to_hidden = nn.Linear(n_latent_features, n_hidden_neurons)
self.hidden_to_output = nn.Linear(n_hidden_neurons, n_output_features)
def forward(self, z: torch.Tensor) -> torch.Tensor:
"""Decode latent vector to image."""
r = self.latent_to_hidden(z)
r = nn.functional.softplus(r)
r = self.hidden_to_output(r)
r = torch.sigmoid(r)
return r
class VariationalAutoencoder(BaseAutoEncoder):
"""Variational Auto Encoder model."""
def __init__(
self,
n_data_features: int,
n_encoder_hidden_features: int,
n_decoder_hidden_features: int,
n_latent_features: int,
):
"""
:param n_data_features: number of input and output features (28 x 28 = 784 for MNIST)
:param n_encoder_hidden_features: number of neurons in encoder's hidden layer
:param n_decoder_hidden_features: number of neurons in decoder's hidden layer
:param n_latent_features: number of latent features
"""
encoder = VEncoder(
n_input_features=n_data_features,
n_hidden_neurons=n_encoder_hidden_features,
n_latent_features=n_latent_features,
)
decoder = VDecoder(
n_latent_features=n_latent_features,
n_hidden_neurons=n_decoder_hidden_features,
n_output_features=n_data_features,
)
super().__init__(
encoder=encoder, decoder=decoder, n_latent_features=n_latent_features
)
self.input_shape = None
def encoder_forward(self, x: torch.Tensor) -> torch.Tensor:
"""Function to perform forward pass through encoder network.
takes: tensor of shape [batch_size x [image-size]] (input images batch)
returns: tensor of shape [batch_size x latent_feature_size] (latent vector)
"""
z = None
if self.input_shape is None:
self.input_shape = x.shape[1:]
x = x.view(x.shape[0], -1)
# TU WPISZ KOD
loc, scale = self.encoder(x)
z = loc + scale * torch.randn(scale.shape)
return z
def decoder_forward(self, z: torch.Tensor) -> torch.Tensor:
"""Function to perform forward pass through decoder network.
takes: tensor of shape [batch_size x latent_feature_size] (latent vector)
returns: tensor of shape [batch_size x [image-size]] (reconstructed images batch)
"""
r = None
# TU WPISZ KOD
r = self.decoder(z)
return r.view(-1, *self.input_shape)
def model(self, x: torch.Tensor):
"""Pyro model for VAE; p(x|z)p(z)."""
pyro.module("decoder", self.decoder)
with pyro.plate("data", x.shape[0]):
z_loc = torch.zeros((x.shape[0], self.n_latent_features))
z_scale = torch.ones((x.shape[0], self.n_latent_features))
z = pyro.sample("latent", dist.Normal(z_loc, z_scale).to_event(1))
output = self.decoder.forward(z).view(-1, *self.input_shape)
pyro.sample("obs", dist.Bernoulli(output).to_event(3), obs=x)
def guide(self, x: torch.Tensor):
"""Pyro guide for VAE; q(z|x)"""
pyro.module("encoder", self.encoder)
with pyro.plate("data", x.shape[0]):
z_loc, z_scale = self.encoder.forward(x.view(x.shape[0], -1))
pyro.sample("latent", dist.Normal(z_loc, z_scale).to_event(1))
batch_size = 32
lr = 1e-3
epochs = 20
vae_model = VariationalAutoencoder(
n_data_features=28 * 28, # MNIST pixels
n_encoder_hidden_features=128, # chosen arbitrarily
n_decoder_hidden_features=128, # chosen arbitrarily
n_latent_features=10, # how many features will be used to represent input
)
train_dataloader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
drop_last=True,
)
val_loader = DataLoader(
val_dataset, batch_size=batch_size, shuffle=False, drop_last=False
)
loss_fn = pyro.infer.Trace_ELBO().differentiable_loss
loss_fn_args = (vae_model.model, vae_model.guide)
train_ae(
vae_model,
epochs=epochs,
train_loader=train_dataloader,
val_loader=val_loader,
lr=lr,
loss_fn=loss_fn,
loss_fn_args=loss_fn_args,
)
analyzer = AutoEncoderAnalyzer(model=vae_model, dataset=val_dataset, n_samplings=5)
analyzer.compare_reconstruction_with_original()
plt.show()
analyzer.compare_samplings()
plt.show()
analyzer.average_points_per_class()
plt.show()
for digit, latent_code in enumerate(analyzer._averages):
print(f"Digit: {digit}")
analyzer.analyze_features(latent_code, steps=11)
plt.show()
analyzer.analyze_tsne() # this may take quite a long time
plt.show()
batch_size = 32
lr = 1e-3
epochs = 20
vae_model = VariationalAutoencoder(
n_data_features=28 * 28, # MNIST pixels
n_encoder_hidden_features=128, # chosen arbitrarily
n_decoder_hidden_features=128, # chosen arbitrarily
n_latent_features=30, # how many features will be used to represent input
)
train_dataloader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
drop_last=True,
)
val_loader = DataLoader(
val_dataset, batch_size=batch_size, shuffle=False, drop_last=False
)
loss_fn = pyro.infer.Trace_ELBO().differentiable_loss
loss_fn_args = (vae_model.model, vae_model.guide)
train_ae(
vae_model,
epochs=epochs,
train_loader=train_dataloader,
val_loader=val_loader,
lr=lr,
loss_fn=loss_fn,
loss_fn_args=loss_fn_args,
)
analyzer = AutoEncoderAnalyzer(model=vae_model, dataset=val_dataset, n_samplings=5)
analyzer.compare_reconstruction_with_original()
plt.show()
batch_size = 32
lr = 1e-3
epochs = 20
vae_model = VariationalAutoencoder(
n_data_features=28 * 28, # MNIST pixels
n_encoder_hidden_features=128, # chosen arbitrarily
n_decoder_hidden_features=128, # chosen arbitrarily
n_latent_features=5, # how many features will be used to represent input
)
train_dataloader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
drop_last=True,
)
val_loader = DataLoader(
val_dataset, batch_size=batch_size, shuffle=False, drop_last=False
)
loss_fn = pyro.infer.Trace_ELBO().differentiable_loss
loss_fn_args = (vae_model.model, vae_model.guide)
train_ae(
vae_model,
epochs=epochs,
train_loader=train_dataloader,
val_loader=val_loader,
lr=lr,
loss_fn=loss_fn,
loss_fn_args=loss_fn_args,
)
analyzer = AutoEncoderAnalyzer(model=vae_model, dataset=val_dataset, n_samplings=5)
analyzer.compare_reconstruction_with_original()
plt.show()
batch_size = 32
lr = 1e-3
epochs = 20
vae_model = VariationalAutoencoder(
n_data_features=28 * 28, # MNIST pixels
n_encoder_hidden_features=40, # chosen arbitrarily
n_decoder_hidden_features=40, # chosen arbitrarily
n_latent_features=10, # how many features will be used to represent input
)
train_dataloader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
drop_last=True,
)
val_loader = DataLoader(
val_dataset, batch_size=batch_size, shuffle=False, drop_last=False
)
loss_fn = pyro.infer.Trace_ELBO().differentiable_loss
loss_fn_args = (vae_model.model, vae_model.guide)
train_ae(
vae_model,
epochs=epochs,
train_loader=train_dataloader,
val_loader=val_loader,
lr=lr,
loss_fn=loss_fn,
loss_fn_args=loss_fn_args,
)
analyzer = AutoEncoderAnalyzer(model=vae_model, dataset=val_dataset, n_samplings=5)
analyzer.compare_reconstruction_with_original()
plt.show()
Zastosowanie wymiaru przestrzeni ukrytej = 30
batch_size = 32
lr = 1e-3
epochs = 20
vae_model = VariationalAutoencoder(
n_data_features=28 * 28, # MNIST pixels
n_encoder_hidden_features=256, # chosen arbitrarily
n_decoder_hidden_features=256, # chosen arbitrarily
n_latent_features=10, # how many features will be used to represent input
)
train_dataloader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
drop_last=True,
)
val_loader = DataLoader(
val_dataset, batch_size=batch_size, shuffle=False, drop_last=False
)
loss_fn = pyro.infer.Trace_ELBO().differentiable_loss
loss_fn_args = (vae_model.model, vae_model.guide)
train_ae(
vae_model,
epochs=epochs,
train_loader=train_dataloader,
val_loader=val_loader,
lr=lr,
loss_fn=loss_fn,
loss_fn_args=loss_fn_args,
)
analyzer = AutoEncoderAnalyzer(model=vae_model, dataset=val_dataset, n_samplings=5)
analyzer.compare_reconstruction_with_original()
plt.show()
batch_size = 32
lr = 1e-2
epochs = 20
vae_model = VariationalAutoencoder(
n_data_features=28 * 28, # MNIST pixels
n_encoder_hidden_features=128, # chosen arbitrarily
n_decoder_hidden_features=128, # chosen arbitrarily
n_latent_features=10, # how many features will be used to represent input
)
train_dataloader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
drop_last=True,
)
val_loader = DataLoader(
val_dataset, batch_size=batch_size, shuffle=False, drop_last=False
)
loss_fn = pyro.infer.Trace_ELBO().differentiable_loss
loss_fn_args = (vae_model.model, vae_model.guide)
train_ae(
vae_model,
epochs=epochs,
train_loader=train_dataloader,
val_loader=val_loader,
lr=lr,
loss_fn=loss_fn,
loss_fn_args=loss_fn_args,
)
analyzer = AutoEncoderAnalyzer(model=vae_model, dataset=val_dataset, n_samplings=5)
analyzer.compare_reconstruction_with_original()
plt.show()
batch_size = 32
lr = 1e-4
epochs = 20
vae_model = VariationalAutoencoder(
n_data_features=28 * 28, # MNIST pixels
n_encoder_hidden_features=128, # chosen arbitrarily
n_decoder_hidden_features=128, # chosen arbitrarily
n_latent_features=10, # how many features will be used to represent input
)
train_dataloader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
drop_last=True,
)
val_loader = DataLoader(
val_dataset, batch_size=batch_size, shuffle=False, drop_last=False
)
loss_fn = pyro.infer.Trace_ELBO().differentiable_loss
loss_fn_args = (vae_model.model, vae_model.guide)
train_ae(
vae_model,
epochs=epochs,
train_loader=train_dataloader,
val_loader=val_loader,
lr=lr,
loss_fn=loss_fn,
loss_fn_args=loss_fn_args,
)
analyzer = AutoEncoderAnalyzer(model=vae_model, dataset=val_dataset, n_samplings=5)
analyzer.compare_reconstruction_with_original()
plt.show()
batch_size = 2
lr = 1e-3
epochs = 20
vae_model = VariationalAutoencoder(
n_data_features=28 * 28, # MNIST pixels
n_encoder_hidden_features=128, # chosen arbitrarily
n_decoder_hidden_features=128, # chosen arbitrarily
n_latent_features=10, # how many features will be used to represent input
)
train_dataloader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
drop_last=True,
)
val_loader = DataLoader(
val_dataset, batch_size=batch_size, shuffle=False, drop_last=False
)
loss_fn = pyro.infer.Trace_ELBO().differentiable_loss
loss_fn_args = (vae_model.model, vae_model.guide)
train_ae(
vae_model,
epochs=epochs,
train_loader=train_dataloader,
val_loader=val_loader,
lr=lr,
loss_fn=loss_fn,
loss_fn_args=loss_fn_args,
)
analyzer = AutoEncoderAnalyzer(model=vae_model, dataset=val_dataset, n_samplings=5)
analyzer.compare_reconstruction_with_original()
plt.show()
Zastosowanie wymiaru warstwy ukrytej = 30 nie przyniosło znacznej poprawy względem wymiaru = 10. Natomiast zastosowanie wymiaru = 5 sprawiło, że generowane obrazy były 'rozmyte'. Model nauczył się nie więcej kształtu danej cyfry, bez możliwości rysowania ostrych granic. Wynika to z niedostatecznej zdolności do zakodowania pełnej informacji w 5 wymiarowej przestrzenii ukrytej. Wykorzystanie 40 neuronów w warstwach ukrytych zamiast 128 sprawiło, że model uczył się troche wolniej. Jednakże ostatecznie jego zdolności generatywne są zbliżone do modelu zawierającego 128 neuronów ukrytych (a przynajmniej wykorzystując ludzkie oko jako przyrzad pomiarowy). Zastosowanie natomiast 256 neuronów, nie przyniosło znacznej poprawy względem domyślnej konfiguracji, co może być związane z 'przystępnym poziomem trudności' zbioru MNIST (tak duża liczba neuronów nie jest potrzebna do rozwiązywania tego zadania). Wpływ współczynnika uczenia był spodziewany. Większe wartości pozwalają na szybsze dojście do poziomu wysycenia krzywej uczenia, a mniejsze sprawiają, że model dłużej dąży do tego optimum. Na koniec zbadano wpływ zastosowania bardzo małego batch'a o rozmiarze 2. Wyniki pokazują jak chaotyczny jest przebieg uczenia. Jednakże obserwując wygenerowane próbki można uznać, że model wyuczył się zdolności generatywnych.
Metody uczenia reprezentacji, do których można zaliczyć autokoder wariacyjny, są często sprawdzane pod względem możliwości ich zastosowania w tzw. downstream tasks, czyli prostych zadaniach mających na celu weryfikację jakości utworzonej reprezentacji danych. Polegają one np. na wytrenowaniu modelu do jakiegoś zadania nie na danych, ale na ich reprezentacji, wytworzonej przez model uczenia reprezentacji. W tym przypadku tym zadaniem będzie klasyfikacja cyfr.
Wybierz dowolny klasyfikator (ważne: klasyfikator ten powinien osiągać słabe rezultaty dla zbioru MNIST). Zbadaj, jakie wartości metryk osiąga on przy zastosowaniu wprost na danych ze zbioru MNIST; sprawdź także ile czasu zajmuje trenowanie klasyfikatora oraz wnioskowanie.
Następnie zastosuj ten sam klasyfikator na ukrytych reprezentacjach wytworzonych przez oba modele autokodera: Autoencoder oraz VariationalAutoencoder, wytrenowane wcześniej (odpowiednio ae_model oraz vae_model). Przetwórz cały zbiór treningowy i walidacyjny z użyciem kodera w celu uzyskania ukrytych reprezentacji przykładów, a następnie wykorzystaj je do wytrenowania prostego klasyfikatora. Porównaj uzyskane metryki oraz szybkość działania.
X_train, y_train = zip(*train_dataset)
X_train = torch.stack(X_train)
X_val, y_val = zip(*val_dataset)
X_val = torch.stack(X_val)
# TU WPISZ KOD
X_train.shape
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import classification_report
from sklearn.svm import SVC
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis
neigh = KNeighborsClassifier(n_neighbors=10)
neigh.fit(X_train.view(X_train.shape[0],-1), y_train)
prediction = neigh.predict(X_val.view(X_val.shape[0], -1))
print(classification_report(y_val, prediction))
svc = SVC()
svc.fit(X_train.view(X_train.shape[0], -1), y_train)
prediction = svc.predict(X_val.view(X_val.shape[0], -1))
print(classification_report(y_val, prediction))
class WeakClassifier(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super().__init__()
self.hidden_size = hidden_size
self.input_size = input_size
self.output_size = output_size
self.model = nn.Sequential(
nn.Linear(self.input_size, self.hidden_size),
nn.ReLU(inplace=True),
nn.Linear(self.hidden_size, self.output_size)
)
def forward(self, x):
x = x.view(x.shape[0], -1)
return self.model(x)
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from tqdm.auto import tqdm, trange
def count_correct(
y_pred: torch.Tensor, y_true: torch.Tensor
) -> torch.Tensor:
preds = torch.argmax(y_pred, dim=1)
return (preds == y_true).float().sum()
def validate(
model: nn.Module,
loss_fn: torch.nn.CrossEntropyLoss,
dataloader: DataLoader
) -> Tuple[torch.Tensor, torch.Tensor]:
loss = 0
correct = 0
all = 0
for X_batch, y_batch in dataloader:
y_pred = model(X_batch)
all += len(y_pred)
loss += loss_fn(y_pred, y_batch).sum()
correct += count_correct(y_pred, y_batch)
return loss / all, correct / all
def fit(model: nn.Module, optimizer: optim.Optimizer,
loss_fn: nn.CrossEntropyLoss, train_dl: DataLoader,
val_dl: DataLoader, epochs: int, print_metrics: str = True):
train_metrics = {
'loss': [],
'acc': []
}
val_metrics = {
'loss': [],
'acc': []
}
for epoch in trange(epochs, desc='epoch'):
model.train()
pbar = tqdm(train_dl, desc='step', leave=False)
for X_batch, y_batch in pbar:
y_pred = model(X_batch)
loss = loss_fn(y_pred, y_batch)
loss.backward()
optimizer.step()
optimizer.zero_grad()
pbar.update(1)
pbar.close()
if print_metrics:
model.eval()
with torch.no_grad():
train_loss, train_acc = validate(
model=model, loss_fn=loss_fn, dataloader=train_dl
)
val_loss, val_acc = validate(
model=model, loss_fn=loss_fn, dataloader=val_dl
)
print(
f"Epoch {epoch}:"
f"train loss = {train_loss:.3f} (acc: {train_acc:.3f}), "
f"validation loss = {val_loss:.3f} (acc: {val_acc:.3f})"
)
train_metrics['loss'].append(train_loss)
train_metrics['acc'].append(train_acc)
val_metrics['loss'].append(val_loss)
val_metrics['acc'].append(val_acc)
if print_metrics:
plot_results(train_metrics, val_metrics)
return model
def plot_results(train_metrics, val_metrics):
fig, (ax1, ax2) = plt.subplots(1,2, figsize=(12,6))
ax1.plot(train_metrics['loss'], label='train')
ax1.plot(val_metrics['loss'], label='val')
ax1.set_title('Loss (Epoch)')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss value')
ax2.plot(train_metrics['acc'], label='train')
ax2.plot(val_metrics['acc'], label='val')
ax2.set_title('ACC (Epoch)')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('ACC value')
ax1.legend()
ax2.legend()
train_dataloader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
drop_last=True,
)
val_loader = DataLoader(
val_dataset, batch_size=batch_size, shuffle=False, drop_last=False
)
weak_classifier = WeakClassifier(input_size=28*28, hidden_size=5,output_size=10)
optimiser = optim.Adam(weak_classifier.parameters())
loss_fn = nn.CrossEntropyLoss()
weak_classifier = fit(weak_classifier, optimiser, loss_fn, train_dataloader, val_loader,
50, True)
class WeakClassifierWithAutoEncoder(nn.Module):
def __init__(self, latent_size, hidden_size, output_size, encoder):
super().__init__()
self.hidden_size = hidden_size
self.latent_size = latent_size
self.output_size = output_size
self.encoder = encoder
for param in self.encoder.parameters():
param.requires_grad = False
self.model = nn.Sequential(
nn.Linear(self.latent_size, self.hidden_size),
nn.ReLU(inplace=True),
nn.Linear(self.hidden_size, self.output_size)
)
def forward(self, x):
x = x.view(x.shape[0], -1)
latent = self.encoder.encoder_forward(x)
return self.model(latent)
train_dataloader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
drop_last=True,
)
val_loader = DataLoader(
val_dataset, batch_size=batch_size, shuffle=False, drop_last=False
)
weak_classifier = WeakClassifierWithAutoEncoder(latent_size=10, hidden_size=5,
output_size=10, encoder=ae_model)
optimiser = optim.Adam(weak_classifier.parameters())
loss_fn = nn.CrossEntropyLoss()
weak_classifier = fit(weak_classifier, optimiser, loss_fn, train_dataloader, val_loader,
50, True)
train_dataloader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
drop_last=True,
)
val_loader = DataLoader(
val_dataset, batch_size=batch_size, shuffle=False, drop_last=False
)
weak_classifier = WeakClassifierWithAutoEncoder(latent_size=10, hidden_size=5,
output_size=10, encoder=vae_model)
optimiser = optim.Adam(weak_classifier.parameters())
loss_fn = nn.CrossEntropyLoss()
weak_classifier = fit(weak_classifier, optimiser, loss_fn, train_dataloader, val_loader,
50, True)
Wykorzystując latent space z AE oraz VAE osiągano trochę gorsze rezultaty niż w przypadku zwykłej sieci neuronowej trenowanej end-to-end. Jednakże w przypadku modeli korzystających z latent space'a zauważono znacznie mniejszy potencjał do przetrenowania, co w dłuższej perspektywie mogłoby wpłynąć na przewagę tych 2 modeli (lepsze zdolności generalizacyjne).
Jednym z fundamentalnych celów uczenia reprezentacji jest dążenie do uzyskania rozłącznych cech (co oznacza, że zmiana pojedynczego elementu wektora ukrytego spowoduje zmianę tylko jednej cechy obrazu wyjściowego). Poprzednie modele nie są w stanie uzyskać tego rezultatu - zmiana pojedynczego elementu wektora wpływa zazwyczaj na więcej niż jedną cechę obrazu wyjściowego.
Jedno z rozwiązań pozwalające na uzyskanie rozłącznych cech jest $\beta$-VAE. Zaproponowana modyfikacja polega na wprowadzeniu współczynnika regularyzacji $\beta$ do funkcji kosztu, dzięki któremu możemy regulować wpływ regularyzacji aproksymacji posteriora na rezultaty trenowania:
$$\log p_\theta \left(\mathbf{x}\right) \ge \mathcal{L}\left(\mathbf{x}, \theta, \phi, \beta\right) = \underbrace{\mathbb{E}_{\mathbf{z}}\left[\log p_\theta\left(\mathbf{x} | \mathbf{z}\right)\right]}_{\text{błąd rekonstrukcji}} - \overbrace{\beta}^{\text{współczynnik regularyzacji}}\underbrace{\left(D_{KL}\left(q_\phi\left(\mathbf{z} | \mathbf{x}\right)\| p\left(\mathbf{z}\right)\right)\right)}_{\text{regularyzacja aproksymacji posteriora}}.$$Publikacja: link
Zadanie polega na implementacji modelu $\beta$-VAE. Wykorzystaj jak najwięcej komponentów klasy VariationalAutoencoder. Podpowiedź: należy zmodyfikować model oraz guide, wykorzystując narzędzia modyfikujące obliczanie score'ów (effect handlers) w Pyro: Poutine. Przeanalizuj model z użyciem AutoEncoderAnalyzer - w szczególności pod względem uzyskiwanej reprezentacji ukrytej, zdolności generatywnych oraz wpływu zmian współczynnika $\beta$.
class BetaVariationalAutoencoder(VariationalAutoencoder):
"""beta-Variational Auto Encoder model."""
def __init__(
self,
n_data_features: int,
n_encoder_hidden_features: int,
n_decoder_hidden_features: int,
n_latent_features: int,
beta: float):
super().__init__(n_data_features, n_encoder_hidden_features,
n_decoder_hidden_features, n_latent_features)
self.beta = beta
# TU WPISZ KOD
def model(self, x: torch.Tensor):
"""Pyro model for VAE; p(x|z)p(z)."""
pyro.module("decoder", self.decoder)
with pyro.plate("data", x.shape[0]):
z_loc = torch.zeros((x.shape[0], self.n_latent_features))
z_scale = torch.ones((x.shape[0], self.n_latent_features))
with pyro.poutine.scale(None, self.beta):
z = pyro.sample("latent", dist.Normal(z_loc, z_scale).to_event(1))
output = self.decoder.forward(z).view(-1, *self.input_shape)
pyro.sample("obs", dist.Bernoulli(output).to_event(3), obs=x)
def guide(self, x: torch.Tensor):
"""Pyro guide for VAE; q(z|x)"""
pyro.module("encoder", self.encoder)
with pyro.plate("data", x.shape[0]):
z_loc, z_scale = self.encoder.forward(x.view(x.shape[0], -1))
with pyro.poutine.scale(None, self.beta):
pyro.sample("latent", dist.Normal(z_loc, z_scale).to_event(1))
batch_size = 32
lr = 1e-3
epochs = 20
bvae_model = BetaVariationalAutoencoder(
n_data_features=28 * 28, # MNIST pixels
n_encoder_hidden_features=128, # chosen arbitrarily
n_decoder_hidden_features=128, # chosen arbitrarily
n_latent_features=10, # how many features will be used to represent input
beta=3. # should limit the number of valuable features
)
train_dataloader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
drop_last=True,
)
val_loader = DataLoader(
val_dataset, batch_size=batch_size, shuffle=False, drop_last=False
)
loss_fn = pyro.infer.Trace_ELBO().differentiable_loss
loss_fn_args = (bvae_model.model, bvae_model.guide)
train_ae(
bvae_model,
epochs=epochs,
train_loader=train_dataloader,
val_loader=val_loader,
lr=lr,
loss_fn=loss_fn,
loss_fn_args=loss_fn_args,
)
analyzer = AutoEncoderAnalyzer(model=bvae_model, dataset=val_dataset, n_samplings=5)
analyzer.compare_reconstruction_with_original()
plt.show()
analyzer.compare_samplings()
plt.show()
analyzer.average_points_per_class()
plt.show()
for digit, latent_code in enumerate(analyzer._averages):
print(f"Digit: {digit}")
analyzer.analyze_features(latent_code, steps=11)
plt.show()
analyzer.analyze_tsne() # this may take quite a long time
plt.show()
batch_size = 32
lr = 1e-3
epochs = 20
bvae_model = BetaVariationalAutoencoder(
n_data_features=28 * 28, # MNIST pixels
n_encoder_hidden_features=128, # chosen arbitrarily
n_decoder_hidden_features=128, # chosen arbitrarily
n_latent_features=10, # how many features will be used to represent input
beta=8. # should limit the number of valuable features
)
train_dataloader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
drop_last=True,
)
val_loader = DataLoader(
val_dataset, batch_size=batch_size, shuffle=False, drop_last=False
)
loss_fn = pyro.infer.Trace_ELBO().differentiable_loss
loss_fn_args = (bvae_model.model, bvae_model.guide)
train_ae(
bvae_model,
epochs=epochs,
train_loader=train_dataloader,
val_loader=val_loader,
lr=lr,
loss_fn=loss_fn,
loss_fn_args=loss_fn_args,
)
analyzer = AutoEncoderAnalyzer(model=bvae_model, dataset=val_dataset, n_samplings=5)
analyzer.compare_reconstruction_with_original()
plt.show()
analyzer.compare_samplings()
plt.show()
analyzer.average_points_per_class()
plt.show()
analyzer.analyze_tsne() # this may take quite a long time
plt.show()
for digit, latent_code in enumerate(analyzer._averages):
print(f"Digit: {digit}")
analyzer.analyze_features(latent_code, steps=11)
plt.show()